package com.jourwon.spring.boot.listener;

import com.jourwon.spring.boot.prop.CustomKafkaListenerProperties;
import com.jourwon.spring.boot.prop.CustomKafkaListenerProperty;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 自定义 Kafka 监听器注册器
 *
 * @author JourWon
 * @date 2022/3/22
 */
@Component
@EnableConfigurationProperties(CustomKafkaListenerProperties.class)
public class CustomKafkaListenerRegistrar implements InitializingBean {

    @Resource
    private CustomKafkaListenerProperties customKafkaListenerProperties;

    @Resource
    private BeanFactory beanFactory;

    @Resource
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Resource
    private KafkaListenerContainerFactory kafkaListenerContainerFactory;

    @Override
    public void afterPropertiesSet() {
        customKafkaListenerProperties.getListeners().forEach(this::registerCustomKafkaListener);
    }

    public String registerCustomKafkaListener(String name, CustomKafkaListenerProperty customKafkaListenerProperty) {
        return this.registerCustomKafkaListener(name, customKafkaListenerProperty, false);
    }

    public String registerCustomKafkaListener(String name, CustomKafkaListenerProperty customKafkaListenerProperty,
                                            boolean startImmediately) {
        String listenerClass = String.join(".", CustomKafkaListenerRegistrar.class.getPackage().getName(),
                customKafkaListenerProperty.getListenerClass());
        AbstractCustomMessageListener customMessageListener = null;
        try {
            customMessageListener = (AbstractCustomMessageListener) beanFactory.getBean(Class.forName(listenerClass));
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        KafkaListenerEndpoint kafkaListenerEndpoint = customMessageListener.createKafkaListenerEndpoint(name, customKafkaListenerProperty.getTopic());
        kafkaListenerEndpointRegistry.registerListenerContainer(
                kafkaListenerEndpoint,
                kafkaListenerContainerFactory, startImmediately);
        return kafkaListenerEndpoint.getId();
    }

}
